package com.jie.flink.cdc.mycdc;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

public class FlinkMysqlToMysql {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 5000));
        env.enableCheckpointing(5000);
        env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
// 创建Table环境
        EnvironmentSettings settings = EnvironmentSettings.newInstance().inStreamingMode().build();
        StreamTableEnvironment tEnv = StreamTableEnvironment.create(env, settings);

// 注册源表和目标表
        tEnv.executeSql("create table sourceTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//源表连接器一定得是mysql-cdc
                "'connector' = 'mysql-cdc'," +
                "'hostname' = 'localhost',\n" +
                " 'port' = '3306',\n" +
                " 'database-name' = 'quarant_db',\n" +
                " 'table-name' = 'organization_info',\n" +
                " 'username' = 'root',\n" +
                " 'password' = 'admin'\n" +
                ")");
// Table result = tEnv.sqlQuery("SELECT id, name,card_num,phone,address FROM sourceTable");
// tEnv.registerTable("sourceTable",result);
        tEnv.executeSql("create table targetTable(id bigint,organization_code VARCHAR, organization_name VARCHAR, parent_code VARCHAR, parent_name VARCHAR,PRIMARY KEY (id) NOT ENFORCED ) WITH (\n" +
//目标表连接器是jdbc
                "'connector' = 'jdbc'," +
                "'url' = 'jdbc:mysql://localhost:3306/testdb?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',\n" +
                " 'table-name' = 'organization_info',\n" +
                " 'username' = 'root',\n" +
                " 'driver' = 'com.mysql.cj.jdbc.Driver',\n" +
                " 'password' = 'admin'\n" +
                ")");
// 执行CDC过程
        String query = "INSERT INTO targetTable SELECT * FROM sourceTable";
        tEnv.executeSql(query).print();
    }
}
/*
————————————————

                            版权声明：本文为博主原创文章，遵循 CC 4.0 BY-SA 版权协议，转载请附上原文出处链接和本声明。
                        
原文链接：https://blog.csdn.net/letterss/article/details/131128378*/
